06. 事件流、事件流 UI 与执行可视化
本章高频面试题
- 什么是事件流?为什么 Agent 系统需要事件流?
- 事件流和普通 request-response 的区别是什么?
- 事件流 UI 和日志面板有什么区别?
- 如何设计事件 schema,才能同时服务前端、调试和审计?
- SSE、WebSocket、HTTP streaming、HTTP/2 长连接分别适合什么场景?
- 如何保证事件顺序、去重和幂等?
- 如何让事件流可恢复、可回放、可重连?resumable stream 怎么实现?
- 并行步骤和多工具调用时,UI 如何展示最合理?
- 什么是 AG-UI?它定义了哪些核心事件类型?
- Generative UI / Tool call UI 是什么?
- Vercel AI SDK、CopilotKit、LangGraph useStream 应该怎么选?
1. 什么是事件流
事件流可以理解成:
系统在执行任务的过程中,不是一次性返回最终结果,而是持续不断地把过程中的状态变化和中间结果发送出来。
在 Agent 场景里,常见事件包括:
- run started / step started
- tool started / finished / failed
- message token streamed
- reasoning token streamed(如果模型暴露了 thinking)
- state delta(状态增量更新)
- waiting for human approval
- final result
2. 为什么 Agent 特别需要事件流
传统接口大多是:请求进来 → 处理 → 返回结果。
Agent 的特点:执行时间更长、中间步骤更多、状态变化更丰富、可能要调用多个工具、可能需要人工介入。
如果没有事件流,用户只能看到一个”加载中”。这带来两个问题:
- 用户体验很差
- 工程排障几乎没有抓手
所以事件流对 Agent 来说不是锦上添花,而是核心基础设施。
3. 什么是事件流 UI
事件流 UI 不是简单把后端日志显示出来。它真正要解决的是:
- 用户知道系统现在在做什么
- 用户知道为什么它还没结束
- 用户知道失败发生在哪一步
- 用户能在合适的时机介入(批准、中断、补充信息)
换句话说,事件流 UI 的核心不是”完整呈现所有底层事件”,而是:
把复杂的 Agent 执行过程翻译成用户能够理解的状态和反馈。
4. 事件 schema 应该怎么设计
一个成熟的事件 schema 至少要满足:
- 可排序
- 可关联(父子关系)
- 可重放
- 可审计
- 可聚合(多客户端看同一个 run)
常见设计:
type AgentEvent = {
eventId: string; // 全局唯一,幂等键
runId: string; // 任务唯一标识
threadId: string; // 会话标识,跨 run
parentId?: string; // 父事件 id,表达父子关系(tool call 属于哪个 step)
sequence: number; // 同一 run 内单调递增
timestamp: string; // ISO 8601
type:
| "run_started"
| "step_started"
| "step_completed"
| "tool_started"
| "tool_completed"
| "tool_failed"
| "token"
| "reasoning_token"
| "state_delta"
| "interrupt"
| "run_completed"
| "run_failed";
source: string; // 哪个 agent / node
status?: "pending" | "running" | "completed" | "failed";
payload: Record<string, unknown>;
// 可选的追踪字段
traceId?: string; // 对接 OTel
spanId?: string;
};几个关键字段:
runId串整个任务threadId跨 run(一个会话可能包含多次 run)sequence排序parentId表达父子关系type给 UI 和监控做语义分流traceId/spanId关联到 OTel,打通事件流和 observability
5. 如何处理顺序、去重和幂等
事件流一旦跨进程、跨网络,就不能假设”顺序天然正确且只来一次”。
5.1 生产端约束
- 每条事件有唯一
eventId - 同一 run 内
sequence单调递增(自增计数器或 Redis INCR) - 事件必须先持久化再推送(outbox pattern),避免推送成功但丢失
- 高频 token 事件可以批量发送,但要保持有序
5.2 消费端约束
- UI 按逻辑顺序展示,不按到达顺序盲目展示
- 基于
eventId去重(5 分钟 LRU cache 足够) - UI 更新逻辑必须幂等:
step_completed重复到达两次也不应该把 UI 搞坏;token 延迟到达不应该导致步骤状态回滚 - 对乱序 token 有两种策略:(a) 按
sequence排序后渲染,(b) append-only 渲染 + 最后一次 state_snapshot 兜底
5.3 Outbox Pattern
强一致性场景推荐 outbox pattern:
- 业务事务里同时写业务表和
events_outbox表 - 单独的 relay 进程读 outbox 推送到流
- 推送成功后标记为 published
好处:事件不会在业务成功后丢失,也不会业务失败但事件已发出。
6. 传输协议:SSE vs WebSocket vs HTTP streaming
6.1 SSE(Server-Sent Events)
基于 HTTP/1.1 chunked 或 HTTP/2,单向服务端推流。
优点:
- 实现简单
- 自动重连(浏览器原生)
Last-Event-ID头天然支持断点续传- 对文本流、状态流非常友好
- 大多数反向代理/CDN 默认支持
适合:聊天流式回答、单向事件通知、常规 Agent 执行展示。这是当前 Agent UI 最主流的选择。
6.2 WebSocket
优点:双向通信、实时控制能力强、适合高交互场景。 适合:用户实时中断任务、多人协作、高频双向控制、需要持续 ping/pong 的低延迟场景。 缺点:复杂、反向代理配置麻烦、serverless 环境不友好。
6.3 HTTP streaming + fetch()
Vercel AI SDK、Anthropic SDK 都在用这个模式:用 fetch() 读 ReadableStream,手动解析 SSE 或自定义 chunk 格式。
优点:serverless 友好(Edge runtime、Cloudflare Workers 都支持)、和 React Suspense/use() 集成自然。 这是目前 React 生态最推崇的方案。
6.4 轮询
适合:后台任务页、实时性要求不高、MVP 阶段。
6.5 怎么选
- 默认 SSE(或等价的 HTTP streaming),除非有明确双向需求
- 需要双向控制(用户随时打断、多人协作)→ WebSocket
- 需要异步长任务查询(分钟到小时级)→ 轮询或 webhook,不要用长连接
7. Resumable Stream:事件流的可恢复设计
很多系统只做”直播流”,却没做”恢复能力”。但用户会:
- 刷新页面
- 切后台
- 断网
- 稍后回来继续看
更稳的设计必须同时具备实时层和恢复层。
7.1 SSE 的天然恢复能力
SSE 规范里有 id: 字段和客户端重连时的 Last-Event-ID 请求头:
event: token
id: 123
data: {"token": "Hello"}
event: token
id: 124
data: {"token": " world"}客户端断开重连时,浏览器自动带上 Last-Event-ID: 124,服务端从 125 开始补发。这是原生支持的,不需要自己造。
7.2 Redis Streams 做事件存储
生产级实现通常这样:
- Agent runtime 把事件 append 到 Redis Stream(每个 run 一个 stream)
- SSE endpoint 从 Redis Stream 读取并推送(支持
XREAD BLOCK阻塞读) - 客户端重连时带
Last-Event-ID,服务端从对应 offset 继续 - 事件同时异步落库(Postgres)做长期存档
Vercel 2024 年开源的 Resumable Streams library 就是这个模式的封装。
7.3 架构要点
- 事件先持久化后推送,避免”推送成功但断连后丢失”
- Stream TTL 合理设置(活跃 run 保留,完成 run 24h 后归档到冷存储)
- 客户端持有
(runId, lastEventId),任何时刻都能恢复 - 前端优先”回放历史 → 接实时流”,避免闪烁
8. 并行执行如何展示
Agent 一旦支持并行检索、多工具并发、子任务并发,UI 就不能再只做一条线性时间轴。
常见做法:
- 顶层展示任务总进度
- 子区域展示并行步骤卡片
- 每个卡片展示状态、耗时、结果摘要
- 失败卡片可展开查看错误和重试信息
- Sub-agent 用折叠容器,默认折叠只显示汇报结果,工程师可展开看过程
设计原则:
用户默认看业务阶段,工程师展开看原始执行细节。
9. AG-UI:Agent ↔ UI 的事件语义标准
9.1 AG-UI 是什么
AG-UI 是一个 Agent 和前端 UI 交互的事件协议标准(ag-ui.com)。它的价值不在于”比 WebSocket 更高级”,而在于:
它标准化了 Agent 和 UI 之间应该交换哪些事件和语义。
可以这样区分:
- WebSocket / SSE:数据怎么传
- AG-UI:传的事件语义是什么
9.2 AG-UI 定义的核心事件类型
根据当前官方文档,AG-UI 的事件分为几大类:
生命周期事件
RunStarted/RunFinished/RunErrorStepStarted/StepFinished
文本消息事件
TextMessageStart/TextMessageContent/TextMessageEndTextMessageChunk(便利事件,组合 start+content+end)
工具调用事件
ToolCallStart/ToolCallArgs(流式参数)/ToolCallEndToolCallResult(工具执行结果)ToolCallChunk(便利事件)
状态管理事件
StateSnapshot(完整状态快照)StateDelta(JSON Patch 格式的增量更新)MessagesSnapshot(会话快照)
活动事件
ActivitySnapshot/ActivityDelta
推理事件
ReasoningStart/ReasoningMessageStart/Content/End/ReasoningEndReasoningEncryptedValue(加密的 CoT,用于 Anthropic extended thinking)
特殊事件
Raw(外部系统事件的容器)Custom(应用自定义扩展)
对面试/工程来说,理解三件事就够了:
- AG-UI 区分”消息流 / 工具调用 / 状态 / 推理”四大类事件
- 状态管理用 snapshot + delta(JSON Patch)模式,保证可恢复
- 推理事件单独分类,方便 UI 折叠/隐藏 CoT
10. Generative UI 和 Tool Call UI
这是 2024-2025 才成熟的模式:让 Agent 不仅返回文本,还返回可以渲染成 UI 组件的结构化结果。
10.1 Vercel AI SDK 的 Generative UI
用 streamUI / createStreamableUI:tool call 的结果直接是一个 React 组件(Server Component),前端自动流式渲染。例如”查机票”工具返回一张卡片组件,而不是 JSON。
10.2 CopilotKit 的 Tool call UI
每个 tool 可以注册对应的 renderer,agent 调 tool 时前端自动渲染对应组件(加载态、结果态、错误态)。
10.3 工程取舍
- Generative UI 让用户体验更丝滑,但把前端组件和 agent 耦合了
- 严格分层的项目(前端独立演进)通常仍用 JSON 结果 + 前端自行渲染
- Multi-frontend 场景(web + mobile + 桌面)不适合 Generative UI
11. 前端选型:useStream、Vercel AI SDK、CopilotKit
当前(2026-04)主流方案:
| 方案 | 适用场景 | 核心能力 |
|---|---|---|
LangGraph useStream | 后端是 LangGraph | 原生接 graph stream、自动处理 interrupt、thread 管理 |
| Vercel AI SDK | 后端任意,前端 React/Next.js | 统一的 useChat / useCompletion hooks、generative UI、多 provider |
| CopilotKit | 需要”把 Copilot 嵌入现有 app” | sidebar/textbox/autocomplete 组件、action 系统 |
| assistant-ui | 更灵活的聊天 UI 组件库 | 模型无关、Compose 风格、适合需要自定义交互 |
| 自研 | 特殊事件语义、非标准协议 | 最大灵活度 |
选型原则:
- 后端是 LangGraph 优先用
useStream,能省大量集成工作 - 需要跨 provider 的聊天组件用 Vercel AI SDK
- 产品形态是”嵌入式 copilot”而不是独立聊天页 → CopilotKit
- 以上都不贴合,再考虑自研
12. TypeScript 示例:带 Last-Event-ID 的 SSE 服务端
import type { IncomingMessage, ServerResponse } from "http";
import { redis } from "./redis";
type StreamEvent = {
eventId: string;
sequence: number;
type: string;
payload: Record<string, unknown>;
};
function writeSseEvent(res: ServerResponse, event: StreamEvent) {
res.write(`id: ${event.sequence}\n`);
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
export async function handleAgentStream(
req: IncomingMessage,
res: ServerResponse,
runId: string
) {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no", // 禁用 nginx 缓冲
});
const lastEventId = req.headers["last-event-id"];
const startFrom = lastEventId ? `${lastEventId}` : "0";
const streamKey = `run:${runId}:events`;
// 1. 回放历史
const historical = await redis.xrange(streamKey, `(${startFrom}`, "+");
for (const [id, fields] of historical) {
const event = parseEvent(id, fields);
writeSseEvent(res, event);
}
// 2. 接实时流
let cursor = historical.length
? historical[historical.length - 1][0]
: startFrom;
const abort = new AbortController();
req.on("close", () => abort.abort());
while (!abort.signal.aborted) {
const newEvents = await redis.xread(
"BLOCK",
30_000,
"STREAMS",
streamKey,
cursor
);
if (!newEvents) continue;
for (const [, entries] of newEvents) {
for (const [id, fields] of entries) {
const event = parseEvent(id, fields);
writeSseEvent(res, event);
cursor = id;
if (event.type === "run_completed" || event.type === "run_failed") {
res.end();
return;
}
}
}
}
}
declare function parseEvent(id: string, fields: string[]): StreamEvent;这段代码体现的工程要点:
- 禁用代理缓冲(
X-Accel-Buffering: no) - 尊重
Last-Event-ID做断点续传 - Redis Stream 承载事件存储,先回放后接实时
- 阻塞读(
BLOCK 30000)避免轮询 - 客户端断连时
abort立即退出循环
13. 事件流 UI 的最佳实践
- 默认展示阶段,不展示原始事件洪流;高级面板可展开 tool trace、错误详情、重试记录
- 长文本流和步骤状态分开渲染——token 流 append-only,步骤状态用 state_delta 覆盖
- 终态必须明确区分成功、失败、取消、等待人工输入
- 刷新后优先恢复历史,再接实时流,避免闪烁
- Tool call 的参数和结果默认折叠,点开才展示完整 JSON
- Reasoning / CoT 事件默认隐藏或折叠,工程师可打开查看
- 审批卡片必须在事件流中显著标记(不是躺在底部等用户发现)
- 每个 tool call / step 关联
traceId,点击能跳转到 observability 工具
14. 本章方法论小结
- 事件流是 Agent 系统的核心基础设施,不是附加功能
- 事件流 UI 是执行过程的”用户化表达”,不是日志面板
- 事件 schema 必须同时服务前端、调试、监控和审计,含
runId/threadId/sequence/parentId/traceId - 顺序、去重、幂等必须提前设计;outbox pattern 是强一致场景的标配
- SSE 是当前默认选择,双向需求才上 WebSocket;HTTP streaming 在 serverless 场景最友好
- Resumable stream 必须一等公民:SSE 的
Last-Event-ID+ Redis Stream 事件存储是成熟方案 - AG-UI 定义了 Agent↔UI 的事件语义标准(lifecycle / message / tool / state / reasoning)
- Generative UI 让交互更丝滑但引入前后端耦合,按项目形态取舍
- 前端选型:后端 LangGraph 用
useStream,跨 provider 用 Vercel AI SDK,嵌入式 copilot 用 CopilotKit